From d927b1c3a2f387982bf872785fd16f913c4dc63f Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 07 Feb 2012 23:09:17 +0000
Subject: [PATCH] Partial fix for OPENDJ-422: Concurrent writes of large LDAP messages can become interleaved
---
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java | 27 ++-
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java | 9
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java | 375 +++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 399 insertions(+), 12 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 9624463..8894e8c 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -30,6 +30,7 @@
+import static com.forgerock.opendj.ldap.SynchronizedConnection.synchronizeConnection;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.io.IOException;
@@ -68,7 +69,7 @@
final class LDAPConnection extends AbstractAsynchronousConnection implements
Connection
{
- private final org.glassfish.grizzly.Connection<?> connection;
+ private final SynchronizedConnection<?> connection;
private Result connectionInvalidReason;
private FilterChain customFilterChain;
private boolean isClosed = false;
@@ -96,7 +97,8 @@
LDAPConnection(final org.glassfish.grizzly.Connection<?> connection,
final LDAPOptions options)
{
- this.connection = connection;
+ // FIXME: remove synchronization when OPENDJ-422 is resolved.
+ this.connection = synchronizeConnection(connection);
this.options = options;
}
@@ -1047,7 +1049,8 @@
: cipherSuites.toArray(new String[cipherSuites.size()]));
sslFilter = new SSLFilter(null, sslEngineConfigurator);
installFilter(sslFilter);
- sslFilter.handshake(connection, completionHandler);
+ sslFilter.handshake(connection.getUnsynchronizedConnection(),
+ completionHandler);
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
index c7faa29..c9b5564 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
@@ -31,6 +31,7 @@
import static com.forgerock.opendj.ldap.LDAPConstants.OID_NOTICE_OF_DISCONNECTION;
+import static com.forgerock.opendj.ldap.SynchronizedConnection.synchronizeConnection;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -214,7 +215,8 @@
private ClientContextImpl(final Connection<?> connection)
{
- this.connection = connection;
+ // FIXME: remove synchronization when OPENDJ-422 is resolved.
+ this.connection = synchronizeConnection(connection);
}
@@ -379,6 +381,13 @@
this.serverConnection = serverConnection;
}
+
+
+ private Connection<?> getConnection()
+ {
+ return connection;
+ }
+
}
@@ -871,7 +880,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final AddHandler handler = new AddHandler(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleAdd(messageID, request, handler, handler);
}
}
@@ -890,7 +899,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final BindHandler handler = new BindHandler(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleBind(messageID, version, bindContext, handler, handler);
}
}
@@ -909,7 +918,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final CompareHandler handler = new CompareHandler(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleCompare(messageID, request, handler, handler);
}
}
@@ -928,7 +937,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final DeleteHandler handler = new DeleteHandler(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleDelete(messageID, request, handler, handler);
}
}
@@ -947,7 +956,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final ExtendedHandler<R> handler = new ExtendedHandler<R>(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleExtendedRequest(messageID, request, handler, handler);
}
}
@@ -966,7 +975,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final ModifyDNHandler handler = new ModifyDNHandler(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleModifyDN(messageID, request, handler, handler);
}
}
@@ -985,7 +994,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final ModifyHandler handler = new ModifyHandler(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleModify(messageID, request, handler, handler);
}
}
@@ -1004,7 +1013,7 @@
final ServerConnection<Integer> conn = clientContext
.getServerConnection();
final SearchHandler handler = new SearchHandler(messageID,
- ctx.getConnection());
+ clientContext.getConnection());
conn.handleSearch(messageID, request, handler, handler);
}
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java
new file mode 100644
index 0000000..daafc7e
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/SynchronizedConnection.java
@@ -0,0 +1,375 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2012 ForgeRock AS
+ */
+
+package com.forgerock.opendj.ldap;
+
+
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.glassfish.grizzly.*;
+import org.glassfish.grizzly.attributes.AttributeHolder;
+import org.glassfish.grizzly.monitoring.MonitoringConfig;
+
+
+
+/**
+ * A Grizzly connection which synchronizes write requests in order to workaround
+ * issue GRIZZLY-1191 (http://java.net/jira/browse/GRIZZLY-1191). See OPENDJ-422
+ * (https://bugster.forgerock.org/jira/browse/OPENDJ-422) for more information.
+ * <p>
+ * This class should be removed once we issue GRIZZLY-422 is resolved and/or we
+ * move to non-blocking IO (requires Grizzly 2.2).
+ */
+final class SynchronizedConnection<L> implements Connection<L>
+{
+ private final Connection<L> connection;
+ private final Object writeLock = new Object();
+
+
+
+ /**
+ * Returns a synchronized view of the provided Grizzly connection.
+ *
+ * @param connection
+ * The Grizzly connection to be synchronized.
+ * @return The synchronized view of the provided Grizzly connection.
+ */
+ static <L> SynchronizedConnection<L> synchronizeConnection(
+ Connection<L> connection)
+ {
+ if (connection instanceof SynchronizedConnection)
+ {
+ return (SynchronizedConnection<L>) connection;
+ }
+ else
+ {
+ return new SynchronizedConnection<L>(connection);
+ }
+ }
+
+
+
+ private SynchronizedConnection(Connection<L> connection)
+ {
+ this.connection = connection;
+ }
+
+
+
+ /**
+ * Returns the underlying unsynchronized connection.
+ *
+ * @return The underlying unsynchronized connection.
+ */
+ Connection<L> getUnsynchronizedConnection()
+ {
+ return connection;
+ }
+
+
+
+ public <M> GrizzlyFuture<WriteResult<M, L>> write(M message)
+ throws IOException
+ {
+ synchronized (writeLock)
+ {
+ return connection.write(message);
+ }
+ }
+
+
+
+ public <M> GrizzlyFuture<ReadResult<M, L>> read() throws IOException
+ {
+ return connection.read();
+ }
+
+
+
+ public AttributeHolder getAttributes()
+ {
+ return connection.getAttributes();
+ }
+
+
+
+ public <M> GrizzlyFuture<ReadResult<M, L>> read(
+ CompletionHandler<ReadResult<M, L>> completionHandler) throws IOException
+ {
+ return connection.read(completionHandler);
+ }
+
+
+
+ public Transport getTransport()
+ {
+ return connection.getTransport();
+ }
+
+
+
+ public <M> GrizzlyFuture<WriteResult<M, L>> write(M message,
+ CompletionHandler<WriteResult<M, L>> completionHandler)
+ throws IOException
+ {
+ synchronized (writeLock)
+ {
+ return connection.write(message, completionHandler);
+ }
+ }
+
+
+
+ public boolean isOpen()
+ {
+ return connection.isOpen();
+ }
+
+
+
+ public void configureBlocking(boolean isBlocking)
+ {
+ connection.configureBlocking(isBlocking);
+ }
+
+
+
+ public <M> GrizzlyFuture<WriteResult<M, L>> write(L dstAddress, M message,
+ CompletionHandler<WriteResult<M, L>> completionHandler)
+ throws IOException
+ {
+ synchronized (writeLock)
+ {
+ return connection.write(dstAddress, message, completionHandler);
+ }
+ }
+
+
+
+ public boolean isBlocking()
+ {
+ return connection.isBlocking();
+ }
+
+
+
+ public void configureStandalone(boolean isStandalone)
+ {
+ connection.configureStandalone(isStandalone);
+ }
+
+
+
+ public boolean isStandalone()
+ {
+ return connection.isStandalone();
+ }
+
+
+
+ @SuppressWarnings("rawtypes")
+ public Processor obtainProcessor(IOEvent ioEvent)
+ {
+ return connection.obtainProcessor(ioEvent);
+ }
+
+
+
+ @SuppressWarnings("rawtypes")
+ public Processor getProcessor()
+ {
+ return connection.getProcessor();
+ }
+
+
+
+ @SuppressWarnings("rawtypes")
+ public void setProcessor(Processor preferableProcessor)
+ {
+ connection.setProcessor(preferableProcessor);
+ }
+
+
+
+ public ProcessorSelector getProcessorSelector()
+ {
+ return connection.getProcessorSelector();
+ }
+
+
+
+ public void setProcessorSelector(ProcessorSelector preferableProcessorSelector)
+ {
+ connection.setProcessorSelector(preferableProcessorSelector);
+ }
+
+
+
+ public L getPeerAddress()
+ {
+ return connection.getPeerAddress();
+ }
+
+
+
+ public L getLocalAddress()
+ {
+ return connection.getLocalAddress();
+ }
+
+
+
+ @SuppressWarnings("rawtypes")
+ public GrizzlyFuture<Connection> close() throws IOException
+ {
+ synchronized (writeLock)
+ {
+ return connection.close();
+ }
+ }
+
+
+
+ @SuppressWarnings("rawtypes")
+ public GrizzlyFuture<Connection> close(
+ CompletionHandler<Connection> completionHandler) throws IOException
+ {
+ synchronized (writeLock)
+ {
+ return connection.close(completionHandler);
+ }
+ }
+
+
+
+ public int getReadBufferSize()
+ {
+ return connection.getReadBufferSize();
+ }
+
+
+
+ public void setReadBufferSize(int readBufferSize)
+ {
+ connection.setReadBufferSize(readBufferSize);
+ }
+
+
+
+ public int getWriteBufferSize()
+ {
+ return connection.getWriteBufferSize();
+ }
+
+
+
+ public void setWriteBufferSize(int writeBufferSize)
+ {
+ connection.setWriteBufferSize(writeBufferSize);
+ }
+
+
+
+ public long getReadTimeout(TimeUnit timeUnit)
+ {
+ return connection.getReadTimeout(timeUnit);
+ }
+
+
+
+ public void setReadTimeout(long timeout, TimeUnit timeUnit)
+ {
+ connection.setReadTimeout(timeout, timeUnit);
+ }
+
+
+
+ public long getWriteTimeout(TimeUnit timeUnit)
+ {
+ return connection.getWriteTimeout(timeUnit);
+ }
+
+
+
+ public void setWriteTimeout(long timeout, TimeUnit timeUnit)
+ {
+ connection.setWriteTimeout(timeout, timeUnit);
+ }
+
+
+
+ public void enableIOEvent(IOEvent ioEvent) throws IOException
+ {
+ connection.enableIOEvent(ioEvent);
+ }
+
+
+
+ public void disableIOEvent(IOEvent ioEvent) throws IOException
+ {
+ connection.disableIOEvent(ioEvent);
+ }
+
+
+
+ public MonitoringConfig<ConnectionProbe> getMonitoringConfig()
+ {
+ return connection.getMonitoringConfig();
+ }
+
+
+
+ public void addCloseListener(
+ org.glassfish.grizzly.Connection.CloseListener closeListener)
+ {
+ connection.addCloseListener(closeListener);
+ }
+
+
+
+ public boolean removeCloseListener(
+ org.glassfish.grizzly.Connection.CloseListener closeListener)
+ {
+ return connection.removeCloseListener(closeListener);
+ }
+
+
+
+ public void notifyConnectionError(Throwable error)
+ {
+ connection.notifyConnectionError(error);
+ }
+
+
+
+ public String toString()
+ {
+ return connection.toString();
+ }
+
+}
--
Gitblit v1.10.0