From d927b1c3a2f387982bf872785fd16f913c4dc63f Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 07 Feb 2012 23:09:17 +0000
Subject: [PATCH] Partial fix for OPENDJ-422: Concurrent writes of large LDAP messages can become interleaved

---
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java       |   27 ++-
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java         |    9 
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java |  375 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 399 insertions(+), 12 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 9624463..8894e8c 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -30,6 +30,7 @@
 
 
 
+import static com.forgerock.opendj.ldap.SynchronizedConnection.synchronizeConnection;
 import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 
 import java.io.IOException;
@@ -68,7 +69,7 @@
 final class LDAPConnection extends AbstractAsynchronousConnection implements
     Connection
 {
-  private final org.glassfish.grizzly.Connection<?> connection;
+  private final SynchronizedConnection<?> connection;
   private Result connectionInvalidReason;
   private FilterChain customFilterChain;
   private boolean isClosed = false;
@@ -96,7 +97,8 @@
   LDAPConnection(final org.glassfish.grizzly.Connection<?> connection,
       final LDAPOptions options)
   {
-    this.connection = connection;
+    // FIXME: remove synchronization when OPENDJ-422 is resolved.
+    this.connection = synchronizeConnection(connection);
     this.options = options;
   }
 
@@ -1047,7 +1049,8 @@
         : cipherSuites.toArray(new String[cipherSuites.size()]));
     sslFilter = new SSLFilter(null, sslEngineConfigurator);
     installFilter(sslFilter);
-    sslFilter.handshake(connection, completionHandler);
+    sslFilter.handshake(connection.getUnsynchronizedConnection(),
+        completionHandler);
   }
 
 
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
index c7faa29..c9b5564 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
@@ -31,6 +31,7 @@
 
 
 import static com.forgerock.opendj.ldap.LDAPConstants.OID_NOTICE_OF_DISCONNECTION;
+import static com.forgerock.opendj.ldap.SynchronizedConnection.synchronizeConnection;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -214,7 +215,8 @@
 
     private ClientContextImpl(final Connection<?> connection)
     {
-      this.connection = connection;
+      // FIXME: remove synchronization when OPENDJ-422 is resolved.
+      this.connection = synchronizeConnection(connection);
     }
 
 
@@ -379,6 +381,13 @@
       this.serverConnection = serverConnection;
     }
 
+
+
+    private Connection<?> getConnection()
+    {
+      return connection;
+    }
+
   }
 
 
@@ -871,7 +880,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final AddHandler handler = new AddHandler(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleAdd(messageID, request, handler, handler);
       }
     }
@@ -890,7 +899,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final BindHandler handler = new BindHandler(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleBind(messageID, version, bindContext, handler, handler);
       }
     }
@@ -909,7 +918,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final CompareHandler handler = new CompareHandler(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleCompare(messageID, request, handler, handler);
       }
     }
@@ -928,7 +937,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final DeleteHandler handler = new DeleteHandler(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleDelete(messageID, request, handler, handler);
       }
     }
@@ -947,7 +956,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final ExtendedHandler<R> handler = new ExtendedHandler<R>(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleExtendedRequest(messageID, request, handler, handler);
       }
     }
@@ -966,7 +975,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final ModifyDNHandler handler = new ModifyDNHandler(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleModifyDN(messageID, request, handler, handler);
       }
     }
@@ -985,7 +994,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final ModifyHandler handler = new ModifyHandler(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleModify(messageID, request, handler, handler);
       }
     }
@@ -1004,7 +1013,7 @@
         final ServerConnection<Integer> conn = clientContext
             .getServerConnection();
         final SearchHandler handler = new SearchHandler(messageID,
-            ctx.getConnection());
+            clientContext.getConnection());
         conn.handleSearch(messageID, request, handler, handler);
       }
     }
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java
new file mode 100644
index 0000000..daafc7e
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java
@@ -0,0 +1,375 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2012 ForgeRock AS
+ */
+
+package com.forgerock.opendj.ldap;
+
+
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.glassfish.grizzly.*;
+import org.glassfish.grizzly.attributes.AttributeHolder;
+import org.glassfish.grizzly.monitoring.MonitoringConfig;
+
+
+
+/**
+ * A Grizzly connection which synchronizes write requests in order to workaround
+ * issue GRIZZLY-1191 (http://java.net/jira/browse/GRIZZLY-1191). See OPENDJ-422
+ * (https://bugster.forgerock.org/jira/browse/OPENDJ-422) for more information.
+ * <p>
+ * This class should be removed once we issue GRIZZLY-422 is resolved and/or we
+ * move to non-blocking IO (requires Grizzly 2.2).
+ */
+final class SynchronizedConnection<L> implements Connection<L>
+{
+  private final Connection<L> connection;
+  private final Object writeLock = new Object();
+
+
+
+  /**
+   * Returns a synchronized view of the provided Grizzly connection.
+   *
+   * @param connection
+   *          The Grizzly connection to be synchronized.
+   * @return The synchronized view of the provided Grizzly connection.
+   */
+  static <L> SynchronizedConnection<L> synchronizeConnection(
+      Connection<L> connection)
+  {
+    if (connection instanceof SynchronizedConnection)
+    {
+      return (SynchronizedConnection<L>) connection;
+    }
+    else
+    {
+      return new SynchronizedConnection<L>(connection);
+    }
+  }
+
+
+
+  private SynchronizedConnection(Connection<L> connection)
+  {
+    this.connection = connection;
+  }
+
+
+
+  /**
+   * Returns the underlying unsynchronized connection.
+   *
+   * @return The underlying unsynchronized connection.
+   */
+  Connection<L> getUnsynchronizedConnection()
+  {
+    return connection;
+  }
+
+
+
+  public <M> GrizzlyFuture<WriteResult<M, L>> write(M message)
+      throws IOException
+  {
+    synchronized (writeLock)
+    {
+      return connection.write(message);
+    }
+  }
+
+
+
+  public <M> GrizzlyFuture<ReadResult<M, L>> read() throws IOException
+  {
+    return connection.read();
+  }
+
+
+
+  public AttributeHolder getAttributes()
+  {
+    return connection.getAttributes();
+  }
+
+
+
+  public <M> GrizzlyFuture<ReadResult<M, L>> read(
+      CompletionHandler<ReadResult<M, L>> completionHandler) throws IOException
+  {
+    return connection.read(completionHandler);
+  }
+
+
+
+  public Transport getTransport()
+  {
+    return connection.getTransport();
+  }
+
+
+
+  public <M> GrizzlyFuture<WriteResult<M, L>> write(M message,
+      CompletionHandler<WriteResult<M, L>> completionHandler)
+      throws IOException
+  {
+    synchronized (writeLock)
+    {
+      return connection.write(message, completionHandler);
+    }
+  }
+
+
+
+  public boolean isOpen()
+  {
+    return connection.isOpen();
+  }
+
+
+
+  public void configureBlocking(boolean isBlocking)
+  {
+    connection.configureBlocking(isBlocking);
+  }
+
+
+
+  public <M> GrizzlyFuture<WriteResult<M, L>> write(L dstAddress, M message,
+      CompletionHandler<WriteResult<M, L>> completionHandler)
+      throws IOException
+  {
+    synchronized (writeLock)
+    {
+      return connection.write(dstAddress, message, completionHandler);
+    }
+  }
+
+
+
+  public boolean isBlocking()
+  {
+    return connection.isBlocking();
+  }
+
+
+
+  public void configureStandalone(boolean isStandalone)
+  {
+    connection.configureStandalone(isStandalone);
+  }
+
+
+
+  public boolean isStandalone()
+  {
+    return connection.isStandalone();
+  }
+
+
+
+  @SuppressWarnings("rawtypes")
+  public Processor obtainProcessor(IOEvent ioEvent)
+  {
+    return connection.obtainProcessor(ioEvent);
+  }
+
+
+
+  @SuppressWarnings("rawtypes")
+  public Processor getProcessor()
+  {
+    return connection.getProcessor();
+  }
+
+
+
+  @SuppressWarnings("rawtypes")
+  public void setProcessor(Processor preferableProcessor)
+  {
+    connection.setProcessor(preferableProcessor);
+  }
+
+
+
+  public ProcessorSelector getProcessorSelector()
+  {
+    return connection.getProcessorSelector();
+  }
+
+
+
+  public void setProcessorSelector(ProcessorSelector preferableProcessorSelector)
+  {
+    connection.setProcessorSelector(preferableProcessorSelector);
+  }
+
+
+
+  public L getPeerAddress()
+  {
+    return connection.getPeerAddress();
+  }
+
+
+
+  public L getLocalAddress()
+  {
+    return connection.getLocalAddress();
+  }
+
+
+
+  @SuppressWarnings("rawtypes")
+  public GrizzlyFuture<Connection> close() throws IOException
+  {
+    synchronized (writeLock)
+    {
+      return connection.close();
+    }
+  }
+
+
+
+  @SuppressWarnings("rawtypes")
+  public GrizzlyFuture<Connection> close(
+      CompletionHandler<Connection> completionHandler) throws IOException
+  {
+    synchronized (writeLock)
+    {
+      return connection.close(completionHandler);
+    }
+  }
+
+
+
+  public int getReadBufferSize()
+  {
+    return connection.getReadBufferSize();
+  }
+
+
+
+  public void setReadBufferSize(int readBufferSize)
+  {
+    connection.setReadBufferSize(readBufferSize);
+  }
+
+
+
+  public int getWriteBufferSize()
+  {
+    return connection.getWriteBufferSize();
+  }
+
+
+
+  public void setWriteBufferSize(int writeBufferSize)
+  {
+    connection.setWriteBufferSize(writeBufferSize);
+  }
+
+
+
+  public long getReadTimeout(TimeUnit timeUnit)
+  {
+    return connection.getReadTimeout(timeUnit);
+  }
+
+
+
+  public void setReadTimeout(long timeout, TimeUnit timeUnit)
+  {
+    connection.setReadTimeout(timeout, timeUnit);
+  }
+
+
+
+  public long getWriteTimeout(TimeUnit timeUnit)
+  {
+    return connection.getWriteTimeout(timeUnit);
+  }
+
+
+
+  public void setWriteTimeout(long timeout, TimeUnit timeUnit)
+  {
+    connection.setWriteTimeout(timeout, timeUnit);
+  }
+
+
+
+  public void enableIOEvent(IOEvent ioEvent) throws IOException
+  {
+    connection.enableIOEvent(ioEvent);
+  }
+
+
+
+  public void disableIOEvent(IOEvent ioEvent) throws IOException
+  {
+    connection.disableIOEvent(ioEvent);
+  }
+
+
+
+  public MonitoringConfig<ConnectionProbe> getMonitoringConfig()
+  {
+    return connection.getMonitoringConfig();
+  }
+
+
+
+  public void addCloseListener(
+      org.glassfish.grizzly.Connection.CloseListener closeListener)
+  {
+    connection.addCloseListener(closeListener);
+  }
+
+
+
+  public boolean removeCloseListener(
+      org.glassfish.grizzly.Connection.CloseListener closeListener)
+  {
+    return connection.removeCloseListener(closeListener);
+  }
+
+
+
+  public void notifyConnectionError(Throwable error)
+  {
+    connection.notifyConnectionError(error);
+  }
+
+
+
+  public String toString()
+  {
+    return connection.toString();
+  }
+
+}

--
Gitblit v1.10.0