From 11859d9a6e466bab4ab73e1e46d092c6052acf68 Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Fri, 02 Feb 2007 21:50:10 +0000
Subject: [PATCH] These changes are for issue 787: LDAP server need to detect failure of changelog servers

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java |   44 ++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 44 insertions(+), 0 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 6c74065..27a6be2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -61,6 +61,7 @@
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
 import org.opends.server.synchronization.protocol.UpdateMessage;
 import org.opends.server.synchronization.protocol.WindowMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
 import org.opends.server.util.TimeThread;
 
 /**
@@ -108,6 +109,17 @@
                                        // be stopped from sending messsages.
   private int saturationCount = 0;
 
+  /**
+   * The time in milliseconds between heartbeats from the synchronization
+   * server.  Zero means heartbeats are off.
+   */
+  private long heartbeatInterval = 0;
+
+  /**
+   * The thread that will send heartbeats.
+   */
+  HeartbeatThread heartbeatThread = null;
+
   private static Map<ChangeNumber, ChangelogAckMessageList>
    changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
 
@@ -173,6 +185,7 @@
         maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
         maxSendDelay = receivedMsg.getMaxSendDelay();
         maxSendQueue = receivedMsg.getMaxSendQueue();
+        heartbeatInterval = receivedMsg.getHeartbeatInterval();
 
         if (maxReceiveQueue > 0)
           restartReceiveQueue = (maxReceiveQueue > 1000 ?
@@ -199,6 +212,12 @@
                               maxSendDelay);
         else
           restartSendDelay = 0;
+
+        if (heartbeatInterval < 0)
+        {
+          heartbeatInterval = 0;
+        }
+
         serverIsLDAPserver = true;
 
         changelogCache = changelog.getChangelogCache(this.baseDn);
@@ -256,6 +275,16 @@
 
       reader.start();
       writer.start();
+
+      // Create a thread to send heartbeat messages.
+      if (heartbeatInterval > 0)
+      {
+        heartbeatThread = new HeartbeatThread("Synchronization Heartbeat",
+                                              session, heartbeatInterval);
+        heartbeatThread.start();
+      }
+
+
       DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
       DirectoryServer.registerMonitorProvider(this);
     }
@@ -853,6 +882,12 @@
       msgQueue.notifyAll();
     }
 
+    // Stop the heartbeat thread.
+    if (heartbeatThread != null)
+    {
+      heartbeatThread.shutdown();
+    }
+
     DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
   }
 
@@ -1225,4 +1260,13 @@
   {
     sendWindow.release(windowMsg.getNumAck());
   }
+
+  /**
+   * Get our heartbeat interval.
+   * @return Our heartbeat interval.
+   */
+  public long getHeartbeatInterval()
+  {
+    return heartbeatInterval;
+  }
 }

--
Gitblit v1.10.0