From 13231f3def739a90b963d42853dea768789925f1 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 05 Jun 2007 14:59:58 +0000
Subject: [PATCH] Fix for issue 1764 : infinite loop when no replication server up and running.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java |  129 ++++++++++++++++++++++--------------------
 1 files changed, 67 insertions(+), 62 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 94cbe62..229f9e9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,8 +207,10 @@
     }
 
     boolean checkState = true;
-    while ((!connected) && (!shutdown))
+    boolean receivedResponse = true;
+    while ((!connected) && (!shutdown) && (receivedResponse))
     {
+      receivedResponse = false;
       for (String server : servers)
       {
         int separator = server.lastIndexOf(':');
@@ -243,6 +245,7 @@
            */
           session.setSoTimeout(1000);
           startMsg = (ReplServerStartMessage) session.receive();
+          receivedResponse = true;
 
           /*
            * We have sent our own protocol version to the replication server.
@@ -274,6 +277,7 @@
             maxSendWindow = startMsg.getWindowSize();
             this.sendWindow = new Semaphore(maxSendWindow);
             connected = true;
+            startHeartBeat();
             break;
           }
           else
@@ -333,6 +337,7 @@
                 {
                   publish(replayOp.generateMessage());
                 }
+                startHeartBeat();
                 break;
               }
             }
@@ -375,51 +380,48 @@
         }
       }
 
-      if (!connected)
+      if ((!connected) && (checkState == true) && receivedResponse)
       {
-        if (checkState == true)
+        /*
+         * We could not find a replicationServer that has seen all the
+         * changes that this server has already processed, start again
+         * the loop looking for any replicationServer.
+         */
+        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
+        String message = getMessage(msgID);
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.NOTICE,
+            message, msgID);
+        try
         {
-          /*
-           * We could not find a replicationServer that has seen all the
-           * changes that this server has already processed, start again
-           * the loop looking for any replicationServer.
-           */
-          try
-          {
-            Thread.sleep(500);
-          } catch (InterruptedException e)
-          {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-          }
-          checkState = false;
-          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
-          String message = getMessage(msgID);
-          logError(ErrorLogCategory.SYNCHRONIZATION,
-              ErrorLogSeverity.NOTICE,
-              message, msgID);
-        }
-        else
+          Thread.sleep(500);
+        } catch (InterruptedException e)
         {
-          /*
-           * This server could not find any replicationServer
-           * Let's wait a little and try again.
-           */
-          checkState = false;
-          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
-          String message = getMessage(msgID);
-          logError(ErrorLogCategory.SYNCHRONIZATION,
-              ErrorLogSeverity.NOTICE, message, msgID);
-          try
-          {
-            Thread.sleep(1000);
-          } catch (InterruptedException e)
-          {
-          }
         }
+        checkState = false;
       }
     }
 
+    if (!connected)
+    {
+      /*
+       * This server could not find any replicationServer
+       * It's going to start in degraded mode.
+       * Log a message
+       */
+      checkState = false;
+      int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
+      String message = getMessage(msgID);
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+          ErrorLogSeverity.NOTICE, message, msgID);
+    }
+  }
+
+  /**
+   * Start the heartbeat monitor thread.
+   */
+  private void startHeartBeat()
+  {
     // Start a heartbeat monitor thread.
     if (heartbeatInterval > 0)
     {
@@ -432,17 +434,27 @@
 
 
   /**
+   * restart the ReplicationBroker.
+   */
+  private void reStart()
+  {
+    reStart(null);
+  }
+
+  /**
    * Restart the ReplicationServer broker after a failure.
    *
    * @param failingSession the socket which failed
    */
   private void reStart(ProtocolSession failingSession)
   {
-    numLostConnections++;
-
     try
     {
-      failingSession.close();
+      if (failingSession != null)
+      {
+        failingSession.close();
+        numLostConnections++;
+      }
     } catch (IOException e1)
     {
       // ignore
@@ -465,6 +477,16 @@
                  ErrorLogSeverity.SEVERE_ERROR,
                  message, msgID);
       }
+      if ((!connected) && (!shutdown))
+      {
+        try
+        {
+          Thread.sleep(500);
+        } catch (InterruptedException e)
+        {
+          // ignore
+        }
+      }
     }
   }
 
@@ -513,6 +535,9 @@
   {
     while (shutdown == false)
     {
+      if (!connected)
+        reStart();
+
       ProtocolSession failingSession = session;
       try
       {
@@ -575,26 +600,6 @@
   }
 
   /**
-   * restart the server after a suspension.
-   * @throws Exception in case of errors.
-   */
-  public void restartReceive() throws Exception
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  /**
-   * Suspend message reception.
-   * @throws Exception in case of errors.
-   */
-  public void suspendReceive() throws Exception
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  /**
    * Set a timeout value.
    * With this option set to a non-zero value, calls to the receive() method
    * block for only this amount of time after which a

--
Gitblit v1.10.0