From 11859d9a6e466bab4ab73e1e46d092c6052acf68 Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Fri, 02 Feb 2007 21:50:10 +0000
Subject: [PATCH] These changes are for issue 787: LDAP server need to detect failure of changelog servers
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 44 ++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 44 insertions(+), 0 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 6c74065..27a6be2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -61,6 +61,7 @@
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -108,6 +109,17 @@
// be stopped from sending messsages.
private int saturationCount = 0;
+ /**
+ * The time in milliseconds between heartbeats from the synchronization
+ * server. Zero means heartbeats are off.
+ */
+ private long heartbeatInterval = 0;
+
+ /**
+ * The thread that will send heartbeats.
+ */
+ HeartbeatThread heartbeatThread = null;
+
private static Map<ChangeNumber, ChangelogAckMessageList>
changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -173,6 +185,7 @@
maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
maxSendDelay = receivedMsg.getMaxSendDelay();
maxSendQueue = receivedMsg.getMaxSendQueue();
+ heartbeatInterval = receivedMsg.getHeartbeatInterval();
if (maxReceiveQueue > 0)
restartReceiveQueue = (maxReceiveQueue > 1000 ?
@@ -199,6 +212,12 @@
maxSendDelay);
else
restartSendDelay = 0;
+
+ if (heartbeatInterval < 0)
+ {
+ heartbeatInterval = 0;
+ }
+
serverIsLDAPserver = true;
changelogCache = changelog.getChangelogCache(this.baseDn);
@@ -256,6 +275,16 @@
reader.start();
writer.start();
+
+ // Create a thread to send heartbeat messages.
+ if (heartbeatInterval > 0)
+ {
+ heartbeatThread = new HeartbeatThread("Synchronization Heartbeat",
+ session, heartbeatInterval);
+ heartbeatThread.start();
+ }
+
+
DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
DirectoryServer.registerMonitorProvider(this);
}
@@ -853,6 +882,12 @@
msgQueue.notifyAll();
}
+ // Stop the heartbeat thread.
+ if (heartbeatThread != null)
+ {
+ heartbeatThread.shutdown();
+ }
+
DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
}
@@ -1225,4 +1260,13 @@
{
sendWindow.release(windowMsg.getNumAck());
}
+
+ /**
+ * Get our heartbeat interval.
+ * @return Our heartbeat interval.
+ */
+ public long getHeartbeatInterval()
+ {
+ return heartbeatInterval;
+ }
}
--
Gitblit v1.10.0