mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.09.2012 49a4f369aae1e601601cd6635dac71b38185d0ab
Partial fix for OPENDJ-422: Concurrent writes of large LDAP messages can become interleaved

Synchronize writes to Grizzly connections. Tests show no significant performance regressions.
1 files added
2 files modified
411 ■■■■■ changed files
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java 9 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java 27 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java 375 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -30,6 +30,7 @@
import static com.forgerock.opendj.ldap.SynchronizedConnection.synchronizeConnection;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.io.IOException;
@@ -68,7 +69,7 @@
final class LDAPConnection extends AbstractAsynchronousConnection implements
    Connection
{
  private final org.glassfish.grizzly.Connection<?> connection;
  private final SynchronizedConnection<?> connection;
  private Result connectionInvalidReason;
  private FilterChain customFilterChain;
  private boolean isClosed = false;
@@ -96,7 +97,8 @@
  LDAPConnection(final org.glassfish.grizzly.Connection<?> connection,
      final LDAPOptions options)
  {
    this.connection = connection;
    // FIXME: remove synchronization when OPENDJ-422 is resolved.
    this.connection = synchronizeConnection(connection);
    this.options = options;
  }
@@ -1047,7 +1049,8 @@
        : cipherSuites.toArray(new String[cipherSuites.size()]));
    sslFilter = new SSLFilter(null, sslEngineConfigurator);
    installFilter(sslFilter);
    sslFilter.handshake(connection, completionHandler);
    sslFilter.handshake(connection.getUnsynchronizedConnection(),
        completionHandler);
  }
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
@@ -31,6 +31,7 @@
import static com.forgerock.opendj.ldap.LDAPConstants.OID_NOTICE_OF_DISCONNECTION;
import static com.forgerock.opendj.ldap.SynchronizedConnection.synchronizeConnection;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -214,7 +215,8 @@
    private ClientContextImpl(final Connection<?> connection)
    {
      this.connection = connection;
      // FIXME: remove synchronization when OPENDJ-422 is resolved.
      this.connection = synchronizeConnection(connection);
    }
@@ -379,6 +381,13 @@
      this.serverConnection = serverConnection;
    }
    private Connection<?> getConnection()
    {
      return connection;
    }
  }
@@ -871,7 +880,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final AddHandler handler = new AddHandler(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleAdd(messageID, request, handler, handler);
      }
    }
@@ -890,7 +899,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final BindHandler handler = new BindHandler(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleBind(messageID, version, bindContext, handler, handler);
      }
    }
@@ -909,7 +918,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final CompareHandler handler = new CompareHandler(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleCompare(messageID, request, handler, handler);
      }
    }
@@ -928,7 +937,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final DeleteHandler handler = new DeleteHandler(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleDelete(messageID, request, handler, handler);
      }
    }
@@ -947,7 +956,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final ExtendedHandler<R> handler = new ExtendedHandler<R>(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleExtendedRequest(messageID, request, handler, handler);
      }
    }
@@ -966,7 +975,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final ModifyDNHandler handler = new ModifyDNHandler(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleModifyDN(messageID, request, handler, handler);
      }
    }
@@ -985,7 +994,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final ModifyHandler handler = new ModifyHandler(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleModify(messageID, request, handler, handler);
      }
    }
@@ -1004,7 +1013,7 @@
        final ServerConnection<Integer> conn = clientContext
            .getServerConnection();
        final SearchHandler handler = new SearchHandler(messageID,
            ctx.getConnection());
            clientContext.getConnection());
        conn.handleSearch(messageID, request, handler, handler);
      }
    }
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java
New file
@@ -0,0 +1,375 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opendj3/legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opendj3/legal-notices/CDDLv1_0.txt.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2012 ForgeRock AS
 */
package com.forgerock.opendj.ldap;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.glassfish.grizzly.*;
import org.glassfish.grizzly.attributes.AttributeHolder;
import org.glassfish.grizzly.monitoring.MonitoringConfig;
/**
 * A Grizzly connection which synchronizes write requests in order to workaround
 * issue GRIZZLY-1191 (http://java.net/jira/browse/GRIZZLY-1191). See OPENDJ-422
 * (https://bugster.forgerock.org/jira/browse/OPENDJ-422) for more information.
 * <p>
 * This class should be removed once we issue GRIZZLY-422 is resolved and/or we
 * move to non-blocking IO (requires Grizzly 2.2).
 */
final class SynchronizedConnection<L> implements Connection<L>
{
  private final Connection<L> connection;
  private final Object writeLock = new Object();
  /**
   * Returns a synchronized view of the provided Grizzly connection.
   *
   * @param connection
   *          The Grizzly connection to be synchronized.
   * @return The synchronized view of the provided Grizzly connection.
   */
  static <L> SynchronizedConnection<L> synchronizeConnection(
      Connection<L> connection)
  {
    if (connection instanceof SynchronizedConnection)
    {
      return (SynchronizedConnection<L>) connection;
    }
    else
    {
      return new SynchronizedConnection<L>(connection);
    }
  }
  private SynchronizedConnection(Connection<L> connection)
  {
    this.connection = connection;
  }
  /**
   * Returns the underlying unsynchronized connection.
   *
   * @return The underlying unsynchronized connection.
   */
  Connection<L> getUnsynchronizedConnection()
  {
    return connection;
  }
  public <M> GrizzlyFuture<WriteResult<M, L>> write(M message)
      throws IOException
  {
    synchronized (writeLock)
    {
      return connection.write(message);
    }
  }
  public <M> GrizzlyFuture<ReadResult<M, L>> read() throws IOException
  {
    return connection.read();
  }
  public AttributeHolder getAttributes()
  {
    return connection.getAttributes();
  }
  public <M> GrizzlyFuture<ReadResult<M, L>> read(
      CompletionHandler<ReadResult<M, L>> completionHandler) throws IOException
  {
    return connection.read(completionHandler);
  }
  public Transport getTransport()
  {
    return connection.getTransport();
  }
  public <M> GrizzlyFuture<WriteResult<M, L>> write(M message,
      CompletionHandler<WriteResult<M, L>> completionHandler)
      throws IOException
  {
    synchronized (writeLock)
    {
      return connection.write(message, completionHandler);
    }
  }
  public boolean isOpen()
  {
    return connection.isOpen();
  }
  public void configureBlocking(boolean isBlocking)
  {
    connection.configureBlocking(isBlocking);
  }
  public <M> GrizzlyFuture<WriteResult<M, L>> write(L dstAddress, M message,
      CompletionHandler<WriteResult<M, L>> completionHandler)
      throws IOException
  {
    synchronized (writeLock)
    {
      return connection.write(dstAddress, message, completionHandler);
    }
  }
  public boolean isBlocking()
  {
    return connection.isBlocking();
  }
  public void configureStandalone(boolean isStandalone)
  {
    connection.configureStandalone(isStandalone);
  }
  public boolean isStandalone()
  {
    return connection.isStandalone();
  }
  @SuppressWarnings("rawtypes")
  public Processor obtainProcessor(IOEvent ioEvent)
  {
    return connection.obtainProcessor(ioEvent);
  }
  @SuppressWarnings("rawtypes")
  public Processor getProcessor()
  {
    return connection.getProcessor();
  }
  @SuppressWarnings("rawtypes")
  public void setProcessor(Processor preferableProcessor)
  {
    connection.setProcessor(preferableProcessor);
  }
  public ProcessorSelector getProcessorSelector()
  {
    return connection.getProcessorSelector();
  }
  public void setProcessorSelector(ProcessorSelector preferableProcessorSelector)
  {
    connection.setProcessorSelector(preferableProcessorSelector);
  }
  public L getPeerAddress()
  {
    return connection.getPeerAddress();
  }
  public L getLocalAddress()
  {
    return connection.getLocalAddress();
  }
  @SuppressWarnings("rawtypes")
  public GrizzlyFuture<Connection> close() throws IOException
  {
    synchronized (writeLock)
    {
      return connection.close();
    }
  }
  @SuppressWarnings("rawtypes")
  public GrizzlyFuture<Connection> close(
      CompletionHandler<Connection> completionHandler) throws IOException
  {
    synchronized (writeLock)
    {
      return connection.close(completionHandler);
    }
  }
  public int getReadBufferSize()
  {
    return connection.getReadBufferSize();
  }
  public void setReadBufferSize(int readBufferSize)
  {
    connection.setReadBufferSize(readBufferSize);
  }
  public int getWriteBufferSize()
  {
    return connection.getWriteBufferSize();
  }
  public void setWriteBufferSize(int writeBufferSize)
  {
    connection.setWriteBufferSize(writeBufferSize);
  }
  public long getReadTimeout(TimeUnit timeUnit)
  {
    return connection.getReadTimeout(timeUnit);
  }
  public void setReadTimeout(long timeout, TimeUnit timeUnit)
  {
    connection.setReadTimeout(timeout, timeUnit);
  }
  public long getWriteTimeout(TimeUnit timeUnit)
  {
    return connection.getWriteTimeout(timeUnit);
  }
  public void setWriteTimeout(long timeout, TimeUnit timeUnit)
  {
    connection.setWriteTimeout(timeout, timeUnit);
  }
  public void enableIOEvent(IOEvent ioEvent) throws IOException
  {
    connection.enableIOEvent(ioEvent);
  }
  public void disableIOEvent(IOEvent ioEvent) throws IOException
  {
    connection.disableIOEvent(ioEvent);
  }
  public MonitoringConfig<ConnectionProbe> getMonitoringConfig()
  {
    return connection.getMonitoringConfig();
  }
  public void addCloseListener(
      org.glassfish.grizzly.Connection.CloseListener closeListener)
  {
    connection.addCloseListener(closeListener);
  }
  public boolean removeCloseListener(
      org.glassfish.grizzly.Connection.CloseListener closeListener)
  {
    return connection.removeCloseListener(closeListener);
  }
  public void notifyConnectionError(Throwable error)
  {
    connection.notifyConnectionError(error);
  }
  public String toString()
  {
    return connection.toString();
  }
}