From 9e1f377c4f21b899d16f4c62450c68691f4b42a8 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Thu, 20 Jun 2013 15:02:35 +0000
Subject: [PATCH] Fix for OPENDJ-846, Intermittent Replication failure. The issue was triggered by the mix of AssuredReplication and bad network conditions, which resulted in a deadlock between 2 RS, as both were blocked on writing to the TCP socket and not reading (because waiting on the write lock). The solution (more of a workaround) is to have another thread for sending data to the socket and have the reader and writer posting data to send to a queue that this new thread is polling. There are still potential deadlocks but they will occur much later, if the sendQueue gets full.  The code needs more work post 2.6 to be fully non blocking, but the changes are enough for now to resolve the customer deadlock case.

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |   28 ++++++++++++++++++++++------
 1 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 7be892c..d252635 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,11 +50,11 @@
 import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
 import org.opends.server.replication.protocol.HeartbeatThread;
-import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.ResetGenerationIdMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartMsg;
 import org.opends.server.replication.protocol.StartSessionMsg;
@@ -88,7 +88,7 @@
    * @param providedMsg     The provided error message.
    * @param handler         The handler that manages that session.
    */
-  static protected void closeSession(ProtocolSession providedSession,
+  static protected void closeSession(Session providedSession,
       Message providedMsg, ServerHandler handler)
   {
     if (providedMsg != null)
@@ -118,7 +118,7 @@
   /**
    * The session opened with the remote server.
    */
-  protected ProtocolSession session;
+  protected Session session;
 
   /**
    * The serverURL of the remote server.
@@ -237,7 +237,7 @@
   /**
    * Creates a new server handler instance with the provided socket.
    *
-   * @param session The ProtocolSession used by the ServerHandler to
+   * @param session The Session used by the ServerHandler to
    *                 communicate with the remote entity.
    * @param queueSize The maximum number of update that will be kept
    *                  in memory by this ServerHandler.
@@ -247,7 +247,7 @@
    * @param rcvWindowSize The window size to receive from the remote server.
    */
   public ServerHandler(
-      ProtocolSession session,
+      Session session,
       int queueSize,
       String replicationServerURL,
       int replicationServerId,
@@ -271,7 +271,7 @@
     // We did not recognize the message, close session as what
     // can happen after is undetermined and we do not want the server to
     // be disturbed
-    ProtocolSession localSession = session;
+    Session localSession = session;
     if (localSession != null)
     {
       closeSession(localSession, reason, this);
@@ -366,6 +366,22 @@
           replicationServerDomain);
       reader = new ServerReader(session, this);
 
+      session.setName("Replication server RS("
+          + this.getReplicationServerId()
+          + ") session thread to " + this.toString() + " at "
+          + session.getReadableRemoteAddress());
+      session.start();
+      try
+      {
+        session.waitForStartup();
+      }
+      catch (InterruptedException e)
+      {
+        final Message message =
+            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
+        throw new DirectoryException(ResultCode.OTHER,
+            message, e);
+      }
       reader.start();
       writer.start();
 

--
Gitblit v1.10.0