From 0f942fb22a49820dacbc16bd9769fbb479e0e4f2 Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Fri, 02 Feb 2007 21:50:10 +0000
Subject: [PATCH] These changes are for issue 787: LDAP server need to detect failure of changelog servers

---
 opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java |   66 ++++++++++++++++++++++++++++-----
 1 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index 5a7d784..5239d75 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -94,6 +94,24 @@
   private int timeout = 0;
 
   /**
+   * The time in milliseconds between heartbeats from the synchronization
+   * server.  Zero means heartbeats are off.
+   */
+  private long heartbeatInterval = 0;
+
+
+  /**
+   * A thread to monitor heartbeats on the session.
+   */
+  private HeartbeatMonitor heartbeatMonitor = null;
+
+  /**
+   * The number of times the connection was lost.
+   */
+  private int numLostConnections = 0;
+
+
+  /**
    * Creates a new Changelog Broker for a particular SynchronizationDomain.
    *
    * @param state The ServerState that should be used by this broker
@@ -110,10 +128,12 @@
    *                     the changelog server.
    * @param maxSendDelay The maximum send delay to use on the changelog server.
    * @param window The size of the send and receive window to use.
+   * @param heartbeatInterval The interval between heartbeats requested of the
+   * changelog server, or zero if no heartbeats are requested.
    */
   public ChangelogBroker(ServerState state, DN baseDn, short serverID,
       int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
-      int maxSendDelay, int window)
+      int maxSendDelay, int window, long heartbeatInterval)
   {
     this.baseDn = baseDn;
     this.serverID = serverID;
@@ -127,6 +147,7 @@
     this.rcvWindow = window;
     this.maxRcvWindow = window;
     this.halfRcvWindow = window/2;
+    this.heartbeatInterval = heartbeatInterval;
   }
 
   /**
@@ -157,7 +178,7 @@
 
 
   /**
-   * Connect the Changelog server to other servers.
+   * Connect to a Changelog server.
    *
    * @throws NumberFormatException address was invalid
    * @throws IOException error during connection phase
@@ -166,6 +187,13 @@
   {
     ChangelogStartMessage startMsg;
 
+    // Stop any existing heartbeat monitor from a previous session.
+    if (heartbeatMonitor != null)
+    {
+      heartbeatMonitor.shutdown();
+      heartbeatMonitor = null;
+    }
+
     boolean checkState = true;
     while( !connected)
     {
@@ -191,9 +219,9 @@
           /*
            * Send our ServerStartMessage.
            */
-          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
+          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
               maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-              halfRcvWindow*2, state);
+              halfRcvWindow*2, heartbeatInterval, state);
           session.publish(msg);
 
 
@@ -369,6 +397,15 @@
         }
       }
     }
+
+    // Start a heartbeat monitor thread.
+    if (heartbeatInterval > 0)
+    {
+      heartbeatMonitor =
+           new HeartbeatMonitor("Synchronization Heartbeat Monitor", session,
+                                heartbeatInterval);
+      heartbeatMonitor.start();
+    }
   }
 
 
@@ -379,6 +416,8 @@
    */
   private void reStart(ProtocolSession failingSession)
   {
+    numLostConnections++;
+
     try
     {
       failingSession.close();
@@ -445,7 +484,7 @@
   /**
    * Receive a message.
    * @return the received message
-   * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+   * @throws SocketTimeoutException if the timeout set by setSoTimeout
    *         has expired
    */
   public SynchronizationMessage receive() throws SocketTimeoutException
@@ -474,13 +513,11 @@
           }
           return msg;
         }
+      } catch (SocketTimeoutException e)
+      {
+        throw e;
       } catch (Exception e)
       {
-        if (e instanceof SocketTimeoutException)
-        {
-          SocketTimeoutException e1 = (SocketTimeoutException) e;
-          throw e1;
-        }
         if (shutdown == false)
         {
           synchronized (lock)
@@ -631,4 +668,13 @@
     else
       return 0;
   }
+
+  /**
+   * Get the number of times the connection was lost.
+   * @return The number of times the connection was lost.
+   */
+  public int getNumLostConnections()
+  {
+    return numLostConnections;
+  }
 }

--
Gitblit v1.10.0