From ea629fa971db08f2267b50522360563a8fec7f86 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  147 +++++++++++++++++++++++++++++++++++-------------
 1 files changed, 107 insertions(+), 40 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 279e6db..f152406 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -316,6 +316,15 @@
   }
 
   /**
+   * Set the generation id - for test purpose.
+   * @param generationID The generation id
+   */
+  public void setGenerationID(long generationID)
+  {
+    this.generationID = generationID;
+  }
+
+  /**
    * Gets the server url of the RS we are connected to.
    * @return The server url of the RS we are connected to
    */
@@ -727,6 +736,15 @@
     {
       this.locallyConfigured = locallyConfigured;
     }
+
+    /**
+     * Returns a string representation of this object.
+     * @return A string representation of this object.
+     */
+    public String toString()
+    {
+      return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId;
+    }
   }
 
   private void connect()
@@ -859,7 +877,8 @@
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
           TRACER.debugInfo(
-            "phase 2 : will perform PhaseOneH with the preferred RS.");
+            "phase 2 : will perform PhaseOneH with the preferred RS="
+              + replicationServerInfo);
         replicationServerInfo = performPhaseOneHandshake(
           replicationServerInfo.getServerURL(), true);
 
@@ -2225,18 +2244,20 @@
 
   /**
    * restart the ReplicationBroker.
+   * @param infiniteTry the socket which failed
    */
-  public void reStart()
+  public void reStart(boolean infiniteTry)
   {
-    reStart(this.session);
+    reStart(this.session, infiniteTry);
   }
 
   /**
    * Restart the ReplicationServer broker after a failure.
    *
    * @param failingSession the socket which failed
+   * @param infiniteTry the socket which failed
    */
-  public void reStart(ProtocolSession failingSession)
+  public void reStart(ProtocolSession failingSession, boolean infiniteTry)
   {
 
     if (failingSession != null)
@@ -2268,6 +2289,7 @@
       rsGroupId = (byte) -1;
       rsServerId = -1;
       rsServerUrl = null;
+      session = null;
     }
     while (!this.connected && (!this.shutdown))
     {
@@ -2282,6 +2304,8 @@
         mb.append(stackTraceToSingleLineString(e));
         logError(mb.toMessage());
       }
+      if ((!connected) && (!infiniteTry))
+        break;
       if ((!connected) && (!shutdown))
       {
         try
@@ -2293,6 +2317,11 @@
         }
       }
     }
+    if (debugEnabled())
+      TRACER.debugInfo(this +
+          " end restart : connected=" + connected +
+          " with RSid=" + this.getRsServerId() +
+          " genid=" + this.generationID);
   }
 
   /**
@@ -2301,7 +2330,18 @@
    */
   public void publish(ReplicationMsg msg)
   {
-    _publish(msg, false);
+    _publish(msg, false, true);
+  }
+
+  /**
+   * Publish a message to the other servers.
+   * @param msg            The message to publish.
+   * @param retryOnFailure Whether reconnect should automatically be done.
+   * @return               Whether publish succeeded.
+   */
+  public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
+  {
+    return _publish(msg, false, retryOnFailure);
   }
 
   /**
@@ -2310,15 +2350,18 @@
    */
   public void publishRecovery(ReplicationMsg msg)
   {
-    _publish(msg, true);
+    _publish(msg, true, true);
   }
 
   /**
    * Publish a message to the other servers.
    * @param msg the message to publish
    * @param recoveryMsg the message is a recovery Message
+   * @param retryOnFailure whether retry should be done on failure
+   * @return whether the message was successfully sent.
    */
-  void _publish(ReplicationMsg msg, boolean recoveryMsg)
+  boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
+      boolean retryOnFailure)
   {
     boolean done = false;
 
@@ -2338,7 +2381,7 @@
             "message is not possible due to existing connection error.");
         }
 
-        return;
+        return false;
       }
 
       try
@@ -2365,7 +2408,7 @@
         // do it.
         if (!recoveryMsg & connectRequiresRecovery)
         {
-          return;
+          return false;
         }
 
         if (msg instanceof UpdateMsg)
@@ -2408,6 +2451,9 @@
         }
       } catch (IOException e)
       {
+        if (!retryOnFailure)
+          return false;
+
         // The receive threads should handle reconnection or
         // mark this broker in error. Just retry.
         synchronized (connectPhaseLock)
@@ -2435,6 +2481,7 @@
         }
       }
     }
+    return true;
   }
 
   /**
@@ -2450,7 +2497,7 @@
    */
   public ReplicationMsg receive() throws SocketTimeoutException
   {
-    return receive(false);
+    return receive(false, true, false);
   }
 
   /**
@@ -2459,22 +2506,29 @@
    * called in a single thread or protected by a locking mechanism
    * before being called.
    *
-   * @return the received message
    * @throws SocketTimeoutException if the timeout set by setSoTimeout
    *         has expired
-   * @param allowReconnectionMechanism If true, this allows the reconnection
-   * mechanism to disconnect the broker if it detects that it should reconnect
-   * to another replication server because of some criteria defined by the
-   * algorithm where we choose a suitable replication server.
+   * @param reconnectToTheBestRS Whether broker will automatically switch
+   *                             to the best suitable RS.
+   * @param reconnectOnFailure   Whether broker will automatically reconnect
+   *                             on failure.
+   * @param returnOnTopoChange   Whether broker should return TopologyMsg
+   *                             received.
+   * @return the received message
+   *
+   * @throws SocketTimeoutException if the timeout set by setSoTimeout
+   *         has expired
    */
-  public ReplicationMsg receive(boolean allowReconnectionMechanism)
+  public ReplicationMsg receive(boolean reconnectToTheBestRS,
+      boolean reconnectOnFailure, boolean returnOnTopoChange)
     throws SocketTimeoutException
   {
     while (shutdown == false)
     {
-      if (!connected)
+      if ((reconnectOnFailure) && (!connected))
       {
-        reStart(null);
+        // infinite try to reconnect
+        reStart(null, true);
       }
 
       ProtocolSession failingSession = session;
@@ -2496,11 +2550,16 @@
         {
           TopologyMsg topoMsg = (TopologyMsg) msg;
           receiveTopo(topoMsg);
-          if (allowReconnectionMechanism)
+          if (reconnectToTheBestRS)
           {
             // Reset wait time before next computation of best server
             mustRunBestServerCheckingAlgorithm = 0;
           }
+
+          // Caller wants to check what's changed
+          if (returnOnTopoChange)
+            return msg;
+
         } else if (msg instanceof StopMsg)
         {
           /*
@@ -2512,7 +2571,7 @@
             Integer.toString(serverId));
           logError(message);
           // Try to find a suitable RS
-          this.reStart(failingSession);
+          this.reStart(failingSession, true);
         } else if (msg instanceof MonitorMsg)
         {
           // This is the response to a MonitorRequest that was sent earlier or
@@ -2551,7 +2610,7 @@
           // it is still the one we are currently connected to. If not,
           // disconnect properly and let the connection algorithm re-connect to
           // best replication server
-          if (allowReconnectionMechanism)
+          if (reconnectToTheBestRS)
           {
             mustRunBestServerCheckingAlgorithm++;
             if (mustRunBestServerCheckingAlgorithm == 2)
@@ -2572,9 +2631,10 @@
                   NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
                   Integer.toString(serverId),
                   Integer.toString(rsServerId),
-                  rsServerUrl);
+                  rsServerUrl,
+                  Integer.toString(bestServerInfo.getServerId()));
                 logError(message);
-                reStart();
+                reStart(null, true);
               }
 
               // Reset wait time before next computation of best server
@@ -2603,10 +2663,13 @@
               Integer.toString(serverId));
             logError(message);
           }
-          this.reStart(failingSession);
+          if (reconnectOnFailure)
+            reStart(failingSession, true);
+          else
+            break; // does not seem necessary to explicitely disconnect ..
         }
       }
-    }
+    } // while !shutdown
     return null;
   }
 
@@ -2676,11 +2739,10 @@
   public void stop()
   {
     if (debugEnabled())
-    {
       debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
         " close the connection to replication server " + rsServerId + " for" +
         " domain " + baseDn);
-    }
+
     stopRSHeartBeatMonitoring();
     stopChangeTimeHeartBeatPublishing();
     replicationServer = "stopped";
@@ -2690,25 +2752,17 @@
     rsServerId = -1;
     rsServerUrl = null;
 
-    if (session != null)
+    try
     {
       if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
       {
         // V4 protocol introduces a StopMsg to properly end communications
-        try
-        {
           session.publish(new StopMsg());
-        } catch (IOException ioe)
-        {
-          // Anyway, going to close session, so nothing to do
-        }
       }
-      try
-      {
-        session.close();
-      } catch (IOException e)
-      {
-      }
+      session.close();
+    } catch (Exception e)
+    {
+      // Anyway, going to close session, so nothing to do
     }
   }
 
@@ -2979,6 +3033,9 @@
    */
   public void receiveTopo(TopologyMsg topoMsg)
   {
+    if (debugEnabled())
+      TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
+
     // Store new DS list
     dsList = topoMsg.getDsList();
 
@@ -3100,4 +3157,14 @@
   {
     connectRequiresRecovery = b;
   }
+
+  /**
+   * Returns whether the broker is shutting down.
+   * @return whether the broker is shutting down.
+   */
+  public boolean shuttingDown()
+  {
+    return shutdown;
+  }
+
 }

--
Gitblit v1.10.0