From 9e1f377c4f21b899d16f4c62450c68691f4b42a8 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Thu, 20 Jun 2013 15:02:35 +0000
Subject: [PATCH] Fix for OPENDJ-846, Intermittent Replication failure. The issue was triggered by the mix of AssuredReplication and bad network conditions, which resulted in a deadlock between 2 RS, as both were blocked on writing to the TCP socket and not reading (because waiting on the write lock). The solution (more of a workaround) is to have another thread for sending data to the socket and have the reader and writer posting data to send to a queue that this new thread is polling. There are still potential deadlocks but they will occur much later, if the sendQueue gets full.  The code needs more work post 2.6 to be fully non blocking, but the changes are enough for now to resolve the customer deadlock case.

---
 opends/src/server/org/opends/server/replication/protocol/Session.java |  228 ++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 171 insertions(+), 57 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/Session.java
similarity index 64%
rename from opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
rename to opends/src/server/org/opends/server/replication/protocol/Session.java
index f27c950..6a67b2a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/Session.java
@@ -40,21 +40,25 @@
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.DataFormatException;
 
 import javax.net.ssl.SSLSocket;
 
+import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.util.StaticUtils;
 
 
 
 /**
- * This class implements a protocol session using TLS.
+ * This class defines a replication session using TLS.
  */
-public final class TLSSocketSession implements ProtocolSession
+public final class Session extends DirectoryThread
 {
   /**
    * The tracer object for the debug logger.
@@ -86,7 +90,7 @@
    * connections.
    */
   private final Object stateLock = new Object();
-  private boolean closeInitiated = false;
+  private volatile boolean closeInitiated = false;
   private Throwable sessionError = null;
 
   /*
@@ -113,10 +117,13 @@
    */
   private BufferedOutputStream output;
 
-
+  private final LinkedBlockingQueue<byte[]> sendQueue =
+      new LinkedBlockingQueue<byte[]>(4000);
+  private AtomicBoolean isRunning = new AtomicBoolean(false);
+  private final CountDownLatch latch = new CountDownLatch(1);
 
   /**
-   * Creates a new TLSSocketSession.
+   * Creates a new Session.
    *
    * @param socket
    *          The regular Socket on which the SocketSession will be based.
@@ -125,13 +132,15 @@
    * @throws IOException
    *           When an IException happens on the socket.
    */
-  public TLSSocketSession(final Socket socket,
-      final SSLSocket secureSocket) throws IOException
+  public Session(final Socket socket,
+                 final SSLSocket secureSocket) throws IOException
   {
+    super("Replication Session from "+ socket.getLocalSocketAddress() +
+        " to " + socket.getRemoteSocketAddress());
     if (debugEnabled())
     {
       TRACER.debugInfo(
-          "Creating TLSSocketSession from %s to %s in %s",
+          "Creating Session from %s to %s in %s",
           socket.getLocalSocketAddress(),
           socket.getRemoteSocketAddress(),
           stackTraceToSingleLineString(new Exception()));
@@ -153,9 +162,9 @@
 
 
   /**
-   * {@inheritDoc}
+   * This method is called when the session with the remote must be closed.
+   * This object won't be used anymore after this method is called.
    */
-  @Override
   public void close()
   {
     Throwable localSessionError;
@@ -171,13 +180,21 @@
       closeInitiated = true;
     }
 
+    try {
+      this.interrupt();
+      this.join();
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
     // Perform close outside of critical section.
     if (debugEnabled())
     {
       if (localSessionError == null)
       {
         TRACER.debugInfo(
-            "Closing TLSSocketSession from %s to %s in %s",
+            "Closing Session from %s to %s in %s",
             plainSocket.getLocalSocketAddress(),
             plainSocket.getRemoteSocketAddress(),
             stackTraceToSingleLineString(new Exception()));
@@ -185,7 +202,7 @@
       else
       {
         TRACER.debugInfo(
-            "Aborting TLSSocketSession from %s to %s in %s due to the "
+            "Aborting Session from %s to %s in %s due to the "
                 + "following error: %s",
             plainSocket.getLocalSocketAddress(),
             plainSocket.getRemoteSocketAddress(),
@@ -199,20 +216,13 @@
     {
       if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
       {
-        if (publishLock.tryLock())
+        try
         {
-          try
-          {
-            publish(new StopMsg());
-          }
-          catch (final IOException ignored)
-          {
-            // Ignore errors on close.
-          }
-          finally
-          {
-            publishLock.unlock();
-          }
+          publish(new StopMsg());
+        }
+        catch (final IOException ignored)
+        {
+          // Ignore errors on close.
         }
       }
     }
@@ -223,9 +233,12 @@
 
 
   /**
-   * {@inheritDoc}
+   * This methods allows to determine if the session close was initiated
+   * on this Session.
+   *
+   * @return A boolean allowing to determine if the session close was initiated
+   * on this Session.
    */
-  @Override
   public boolean closeInitiated()
   {
     synchronized (stateLock)
@@ -237,9 +250,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Gets the time the last replication message was published on this
+   * session.
+   * @return The timestamp in milliseconds of the last message published.
    */
-  @Override
   public long getLastPublishTime()
   {
     return lastPublishTime;
@@ -248,9 +262,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Gets the time the last replication message was received on this
+   * session.
+   * @return The timestamp in milliseconds of the last message received.
    */
-  @Override
   public long getLastReceiveTime()
   {
     if (lastReceiveTime == 0)
@@ -263,9 +278,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Retrieve the local URL in the form host:port.
+   *
+   * @return The local URL.
    */
-  @Override
   public String getLocalUrl()
   {
     return localUrl;
@@ -274,9 +290,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Retrieve the human readable address of the remote server.
+   *
+   * @return The human readable address of the remote server.
    */
-  @Override
   public String getReadableRemoteAddress()
   {
     return readableRemoteAddress;
@@ -285,9 +302,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Retrieve the IP address of the remote server.
+   *
+   * @return The IP address of the remote server.
    */
-  @Override
   public String getRemoteAddress()
   {
     return remoteAddress;
@@ -296,9 +314,9 @@
 
 
   /**
-   * {@inheritDoc}
+   * Determine whether the session is using a security layer.
+   * @return true if the connection is encrypted, false otherwise.
    */
-  @Override
   public boolean isEncrypted()
   {
     return isEncrypted;
@@ -307,12 +325,38 @@
 
 
   /**
-   * {@inheritDoc}
+   * Sends a replication message to the remote peer.
+   *
+   * @param msg
+   *          The message to be sent.
+   * @throws IOException
+   *           If an IO error occurred.
    */
-  @Override
   public void publish(final ReplicationMsg msg) throws IOException
   {
     final byte[] buffer = msg.getBytes(protocolVersion);
+    if (isRunning.get())
+    {
+      try {
+        sendQueue.put(buffer);
+      }
+      catch (final InterruptedException e) {
+        setSessionError(e);
+        throw new IOException(e.getMessage());
+      }
+    } else {
+      send(buffer);
+    }
+  }
+
+  /** Sends a replication message already encoded to the socket.
+   *
+   * @param buffer
+   *          the encoded buffer
+   * @throws IOException if the message could not be sent
+   */
+  private void send(final byte[] buffer) throws IOException
+  {
     final String str = String.format("%08x", buffer.length);
     final byte[] sendLengthBuf = str.getBytes();
 
@@ -326,9 +370,7 @@
       output.write(sendLengthBuf);
       output.write(buffer);
       output.flush();
-    }
-    catch (final IOException e)
-    {
+    } catch (final IOException e) {
       setSessionError(e);
       throw e;
     }
@@ -343,9 +385,20 @@
 
 
   /**
-   * {@inheritDoc}
+   * Attempt to receive a ReplicationMsg.
+   * This method should block the calling thread until a
+   * ReplicationMsg is available or until an error condition.
+   *
+   * This method can only be called by a single thread and therefore does not
+   * need to implement any replication.
+   *
+   * @return The ReplicationMsg that was received.
+   * @throws IOException When error happened during IO process.
+   * @throws DataFormatException When the data received is not formatted as a
+   *         ReplicationMsg.
+   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
+   * an old protocol version and we do not support it.
    */
-  @Override
   public ReplicationMsg receive() throws IOException,
       DataFormatException, NotSupportedOldVersionPDUException
   {
@@ -432,22 +485,23 @@
   }
 
 
-
   /**
-   * {@inheritDoc}
+   * This method is called at the establishment of the session and can
+   * be used to record the version of the protocol that is currently used.
+   *
+   * @param version The version of the protocol that is currently used.
    */
-  @Override
   public void setProtocolVersion(final short version)
   {
     protocolVersion = version;
   }
 
 
-
   /**
-   * {@inheritDoc}
+   * Returns the version of the protocol that is currently used.
+   *
+   * @return The version of the protocol that is currently used.
    */
-  @Override
   public short getProtocolVersion()
   {
     return protocolVersion;
@@ -456,9 +510,16 @@
 
 
   /**
-   * {@inheritDoc}
+   * Set a timeout value.
+   * With this option set to a non-zero value, calls to the receive() method
+   * block for only this amount of time after which a
+   * java.net.SocketTimeoutException is raised.
+   * The Broker is valid and usable even after such an Exception is raised.
+   *
+   * @param timeout the specified timeout, in milliseconds.
+   * @throws SocketException if there is an error in the underlying protocol,
+   *         such as a TCP error.
    */
-  @Override
   public void setSoTimeout(final int timeout) throws SocketException
   {
     plainSocket.setSoTimeout(timeout);
@@ -467,10 +528,8 @@
 
 
   /**
-   * {@inheritDoc}
+   * Stop using the security layer, if there is any.
    */
-  @SuppressWarnings("unused")
-  @Override
   public void stopEncryption()
   {
     /*
@@ -500,4 +559,59 @@
       }
     }
   }
+
+  /**
+   * Run method for the Session.
+   * Loops waiting for buffers from the queue and sends them when available.
+   */
+  public void run()
+  {
+    isRunning.set(true);
+    latch.countDown();
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(this.getName() + " starting.");
+    }
+    boolean needClosing = false;
+    while (!closeInitiated)
+    {
+      byte[] buffer;
+      try
+      {
+        buffer = sendQueue.take();
+      }
+      catch (InterruptedException ie)
+      {
+        break;
+      }
+      try
+      {
+        send(buffer);
+      }
+      catch (IOException e)
+      {
+        setSessionError(e);
+        needClosing = true;
+      }
+    }
+    isRunning.set(false);
+    if (needClosing)
+    {
+      close();
+    }
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(this.getName() + " stopped.");
+    }
+  }
+
+  /**
+   * This method can be called to wait until the session thread is
+   * properly started.
+   * @throws InterruptedException when interrupted
+   */
+  public void waitForStartup() throws InterruptedException
+  {
+    latch.await();
+  }
 }

--
Gitblit v1.10.0